본문으로 건너뛰기

데이터 제공 사이트 활용

다양한 데이터 제공 사이트를 활용하면 분석에 필요한 고품질 데이터를 효율적으로 확보할 수 있다. 공공 데이터부터 전문 데이터셋까지, 각 사이트의 특성을 이해하고 활용하는 방법을 알아보자.

공공데이터 포털 완전 활용

공공데이터포털(data.go.kr) 기본 활용

import requests
import pandas as pd
import json
from urllib.parse import urlencode

class PublicDataAPI:
def __init__(self, service_key):
self.service_key = service_key
self.base_url = "http://apis.data.go.kr"

def get_population_data(self, region_code, year):
"""인구 통계 데이터 조회"""
endpoint = "/1160100/service/GetPopulationKoreanService/getPopulationKorean"

params = {
'serviceKey': self.service_key,
'numOfRows': 100,
'pageNo': 1,
'resultType': 'json',
'admCode': region_code,
'year': year
}

url = self.base_url + endpoint + '?' + urlencode(params)

try:
response = requests.get(url)
response.raise_for_status()

data = response.json()

if 'response' in data and 'body' in data['response']:
items = data['response']['body']['items']['item']
df = pd.DataFrame(items)
return df
else:
print("데이터를 찾을 수 없습니다.")
return pd.DataFrame()

except requests.RequestException as e:
print(f"API 요청 오류: {e}")
return pd.DataFrame()

def get_business_data(self, business_type, region):
"""사업체 통계 데이터 조회"""
# 실제 API 엔드포인트에 맞게 구현
pass

# 사용 예시
# api = PublicDataAPI('your_service_key_here')
# population_df = api.get_population_data('11000', '2023')

서울시 열린데이터광장 활용

import requests
import pandas as pd

class SeoulOpenData:
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "http://openapi.seoul.go.kr:8088"

def get_subway_passenger_data(self, start_date, end_date):
"""지하철 승하차 데이터 조회"""
service_name = "CardSubwayStatsNew"

url = f"{self.base_url}/{self.api_key}/json/{service_name}/1/1000/{start_date}/{end_date}"

try:
response = requests.get(url)
response.raise_for_status()

data = response.json()

if service_name in data:
items = data[service_name]['row']
df = pd.DataFrame(items)

# 데이터 타입 변환
numeric_cols = ['RIDE_PASGR_NUM', 'ALIGHT_PASGR_NUM']
for col in numeric_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce')

return df
else:
print("데이터를 찾을 수 없습니다.")
return pd.DataFrame()

except requests.RequestException as e:
print(f"API 요청 오류: {e}")
return pd.DataFrame()

def get_air_quality_data(self, district):
"""대기질 데이터 조회"""
service_name = "RealtimeCityAir"

url = f"{self.base_url}/{self.api_key}/json/{service_name}/1/25"

try:
response = requests.get(url)
data = response.json()

if service_name in data:
items = data[service_name]['row']
df = pd.DataFrame(items)

if district:
df = df[df['MSRSTE_NM'].str.contains(district, na=False)]

return df
else:
return pd.DataFrame()

except Exception as e:
print(f"오류 발생: {e}")
return pd.DataFrame()

# 사용 예시
# seoul_api = SeoulOpenData('your_api_key_here')
# subway_data = seoul_api.get_subway_passenger_data('20231201', '20231231')
# air_data = seoul_api.get_air_quality_data('강남구')

Kaggle 데이터셋 다운로드 자동화

Kaggle API 설정 및 활용

import kaggle
import pandas as pd
import os
import zipfile

class KaggleDataManager:
def __init__(self):
# Kaggle API 키 설정 필요 (~/.kaggle/kaggle.json)
self.api = kaggle.KaggleApi()
self.api.authenticate()

def search_datasets(self, keyword, max_size_mb=100):
"""키워드로 데이터셋 검색"""
datasets = self.api.dataset_list(search=keyword, max_size=max_size_mb*1024*1024)

dataset_info = []
for dataset in datasets:
dataset_info.append({
'title': dataset.title,
'ref': dataset.ref,
'size_mb': dataset.totalBytes / (1024*1024) if dataset.totalBytes else 0,
'download_count': dataset.downloadCount,
'vote_count': dataset.voteCount,
'usability_rating': dataset.usabilityRating
})

return pd.DataFrame(dataset_info).sort_values('usability_rating', ascending=False)

def download_dataset(self, dataset_ref, extract_path='./kaggle_data'):
"""데이터셋 다운로드 및 압축 해제"""
try:
# 디렉토리 생성
os.makedirs(extract_path, exist_ok=True)

# 데이터셋 다운로드
self.api.dataset_download_files(dataset_ref, path=extract_path, unzip=True)

print(f"다운로드 완료: {dataset_ref}")

# 다운로드된 파일 목록 반환
files = []
for root, dirs, filenames in os.walk(extract_path):
for filename in filenames:
files.append(os.path.join(root, filename))

return files

except Exception as e:
print(f"다운로드 오류: {e}")
return []

def auto_load_csv_files(self, dataset_ref):
"""데이터셋 다운로드 후 CSV 파일들을 자동으로 로드"""
files = self.download_dataset(dataset_ref)

dataframes = {}

for file_path in files:
if file_path.endswith('.csv'):
try:
filename = os.path.basename(file_path)
df = pd.read_csv(file_path)
dataframes[filename] = df
print(f"로드 완료: {filename} - {df.shape}")
except Exception as e:
print(f"파일 로드 오류 {file_path}: {e}")

return dataframes

def get_competition_data(self, competition_name):
"""Kaggle 경진대회 데이터 다운로드"""
try:
self.api.competition_download_files(competition_name, path='./competition_data')
print(f"경진대회 데이터 다운로드 완료: {competition_name}")

# ZIP 파일 압축 해제
zip_path = f"./competition_data/{competition_name}.zip"
if os.path.exists(zip_path):
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall('./competition_data')
os.remove(zip_path)

except Exception as e:
print(f"경진대회 데이터 다운로드 오류: {e}")

# 사용 예시
# kaggle_manager = KaggleDataManager()
#
# # 데이터셋 검색
# datasets = kaggle_manager.search_datasets('house prices', max_size_mb=50)
# print(datasets.head())
#
# # 데이터셋 다운로드 및 로드
# dataframes = kaggle_manager.auto_load_csv_files('paultimothymooney/chest-xray-pneumonia')

금융 데이터 API

yfinance를 활용한 주식 데이터 수집

import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta

class FinanceDataCollector:
def __init__(self):
pass

def get_stock_data(self, symbols, period='1y', interval='1d'):
"""주식 데이터 수집"""
if isinstance(symbols, str):
symbols = [symbols]

stock_data = {}

for symbol in symbols:
try:
ticker = yf.Ticker(symbol)
data = ticker.history(period=period, interval=interval)
stock_data[symbol] = data
print(f"데이터 수집 완료: {symbol}")
except Exception as e:
print(f"데이터 수집 오류 {symbol}: {e}")

return stock_data

def get_multiple_stocks_comparison(self, symbols, start_date, end_date):
"""여러 주식 비교 데이터"""
data = yf.download(symbols, start=start_date, end=end_date)

# 종가만 추출하여 비교
if len(symbols) > 1:
close_prices = data['Close']
else:
close_prices = data['Close'].to_frame(symbols[0])

# 수익률 계산
returns = close_prices.pct_change().dropna()

# 누적 수익률 계산
cumulative_returns = (1 + returns).cumprod()

return {
'prices': close_prices,
'returns': returns,
'cumulative_returns': cumulative_returns
}

def get_market_indices(self):
"""주요 시장 지수 데이터"""
indices = {
'S&P 500': '^GSPC',
'NASDAQ': '^IXIC',
'DOW': '^DJI',
'KOSPI': '^KS11',
'KOSDAQ': '^KQ11'
}

index_data = {}

for name, symbol in indices.items():
try:
ticker = yf.Ticker(symbol)
data = ticker.history(period='1y')
index_data[name] = data
except Exception as e:
print(f"지수 데이터 수집 오류 {name}: {e}")

return index_data

def get_company_info(self, symbol):
"""기업 정보 조회"""
ticker = yf.Ticker(symbol)

info = ticker.info

# 주요 정보만 추출
key_info = {
'company_name': info.get('longName', 'N/A'),
'sector': info.get('sector', 'N/A'),
'industry': info.get('industry', 'N/A'),
'market_cap': info.get('marketCap', 'N/A'),
'pe_ratio': info.get('trailingPE', 'N/A'),
'dividend_yield': info.get('dividendYield', 'N/A'),
'beta': info.get('beta', 'N/A')
}

return key_info

# 사용 예시
# finance_collector = FinanceDataCollector()
#
# # 개별 주식 데이터
# apple_data = finance_collector.get_stock_data('AAPL', period='2y')
#
# # 여러 주식 비교
# comparison = finance_collector.get_multiple_stocks_comparison(
# ['AAPL', 'GOOGL', 'MSFT'],
# '2023-01-01',
# '2023-12-31'
# )
#
# # 시장 지수 데이터
# indices = finance_collector.get_market_indices()

한국투자증권 API 활용

import requests
import pandas as pd
import json

class KISDataAPI:
def __init__(self, app_key, app_secret):
self.app_key = app_key
self.app_secret = app_secret
self.base_url = "https://openapi.koreainvestment.com:9443"
self.access_token = None
self.get_access_token()

def get_access_token(self):
"""접근 토큰 발급"""
url = f"{self.base_url}/oauth2/tokenP"

headers = {
"content-type": "application/json"
}

data = {
"grant_type": "client_credentials",
"appkey": self.app_key,
"appsecret": self.app_secret
}

try:
response = requests.post(url, headers=headers, data=json.dumps(data))
response.raise_for_status()

result = response.json()
self.access_token = result['access_token']
print("접근 토큰 발급 완료")

except Exception as e:
print(f"토큰 발급 오류: {e}")

def get_stock_price(self, stock_code):
"""주식 현재가 조회"""
url = f"{self.base_url}/uapi/domestic-stock/v1/quotations/inquire-price"

headers = {
"authorization": f"Bearer {self.access_token}",
"appkey": self.app_key,
"appsecret": self.app_secret,
"tr_id": "FHKST01010100"
}

params = {
"fid_cond_mrkt_div_code": "J",
"fid_input_iscd": stock_code
}

try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()

data = response.json()

if data['rt_cd'] == '0':
output = data['output']
return {
'stock_code': stock_code,
'stock_name': output['hts_kor_isnm'],
'current_price': int(output['stck_prpr']),
'change_rate': float(output['prdy_ctrt']),
'volume': int(output['acml_vol'])
}
else:
print(f"데이터 조회 실패: {data['msg1']}")
return None

except Exception as e:
print(f"주식 가격 조회 오류: {e}")
return None

# 사용 예시 (실제 API 키 필요)
# kis_api = KISDataAPI('your_app_key', 'your_app_secret')
# samsung_price = kis_api.get_stock_price('005930') # 삼성전자

소셜미디어 데이터 수집 기초

Twitter API v2 활용

import tweepy
import pandas as pd
from datetime import datetime, timedelta

class TwitterDataCollector:
def __init__(self, bearer_token):
self.client = tweepy.Client(bearer_token=bearer_token)

def search_tweets(self, query, max_results=100, lang='ko'):
"""트윗 검색"""
try:
tweets = tweepy.Paginator(
self.client.search_recent_tweets,
query=query,
max_results=max_results,
tweet_fields=['created_at', 'author_id', 'public_metrics', 'lang']
).flatten(limit=max_results)

tweet_data = []

for tweet in tweets:
if tweet.lang == lang: # 언어 필터링
tweet_data.append({
'id': tweet.id,
'text': tweet.text,
'created_at': tweet.created_at,
'author_id': tweet.author_id,
'retweet_count': tweet.public_metrics['retweet_count'],
'like_count': tweet.public_metrics['like_count'],
'reply_count': tweet.public_metrics['reply_count']
})

return pd.DataFrame(tweet_data)

except Exception as e:
print(f"트윗 검색 오류: {e}")
return pd.DataFrame()

def analyze_tweet_sentiment(self, df):
"""간단한 감정 분석 (키워드 기반)"""
positive_words = ['좋다', '훌륭하다', '최고', '감사', '행복', '만족']
negative_words = ['나쁘다', '최악', '실망', '화나다', '짜증', '불만']

def get_sentiment(text):
positive_count = sum(1 for word in positive_words if word in text)
negative_count = sum(1 for word in negative_words if word in text)

if positive_count > negative_count:
return 'positive'
elif negative_count > positive_count:
return 'negative'
else:
return 'neutral'

df['sentiment'] = df['text'].apply(get_sentiment)

return df

# 사용 예시 (실제 Bearer Token 필요)
# twitter_collector = TwitterDataCollector('your_bearer_token')
# tweets_df = twitter_collector.search_tweets('#데이터분석', max_results=50)
# analyzed_tweets = twitter_collector.analyze_tweet_sentiment(tweets_df)

데이터 품질 검증 및 전처리

수집된 데이터 품질 체크

def validate_collected_data(df, source_name):
"""수집된 데이터의 품질을 검증"""

quality_report = {
'source': source_name,
'total_rows': len(df),
'total_columns': len(df.columns),
'missing_data': {},
'data_types': {},
'duplicates': 0,
'quality_score': 0
}

# 결측치 확인
for col in df.columns:
missing_count = df[col].isnull().sum()
missing_percent = (missing_count / len(df)) * 100
quality_report['missing_data'][col] = {
'count': missing_count,
'percentage': round(missing_percent, 2)
}

# 데이터 타입 확인
quality_report['data_types'] = df.dtypes.to_dict()

# 중복 데이터 확인
quality_report['duplicates'] = df.duplicated().sum()

# 품질 점수 계산 (간단한 예시)
missing_penalty = sum([info['percentage'] for info in quality_report['missing_data'].values()]) / len(df.columns)
duplicate_penalty = (quality_report['duplicates'] / len(df)) * 100

quality_report['quality_score'] = max(0, 100 - missing_penalty - duplicate_penalty)

return quality_report

def clean_collected_data(df):
"""수집된 데이터 기본 정리"""

# 1. 중복 제거
df_cleaned = df.drop_duplicates()

# 2. 공백 문자 정리
for col in df_cleaned.select_dtypes(include=['object']).columns:
df_cleaned[col] = df_cleaned[col].astype(str).str.strip()

# 3. 날짜 컬럼 자동 감지 및 변환
for col in df_cleaned.columns:
if 'date' in col.lower() or 'time' in col.lower():
try:
df_cleaned[col] = pd.to_datetime(df_cleaned[col], errors='coerce')
except:
pass

# 4. 숫자 컬럼 자동 감지 및 변환
for col in df_cleaned.select_dtypes(include=['object']).columns:
try:
# 숫자로 변환 가능한지 확인
numeric_series = pd.to_numeric(df_cleaned[col], errors='coerce')
if numeric_series.notna().sum() > len(df_cleaned) * 0.8: # 80% 이상이 숫자라면
df_cleaned[col] = numeric_series
except:
pass

return df_cleaned

# 사용 예시
# quality_report = validate_collected_data(kaggle_df, 'Kaggle Dataset')
# cleaned_df = clean_collected_data(kaggle_df)

정리

다양한 데이터 제공 사이트를 효과적으로 활용하면 분석에 필요한 풍부한 데이터를 확보할 수 있다. 각 사이트의 API 특성을 이해하고, 데이터 품질을 검증하며, 적절한 전처리를 수행하는 것이 중요하다. 특히 공공 데이터와 금융 데이터는 신뢰할 수 있는 분석의 기반이 되므로 적극 활용하는 것이 좋다.

다음 섹션에서는 AI를 활용한 데이터 수집에 대해 알아보겠다.